import 'dart:async';

import 'package:rxdart/rxdart.dart';

import '../../common/test_page.dart';
import '../utils.dart';

Stream<int> _getStream() => Stream.fromIterable(const [0, 1, 2, 3]);

const List<int> expected = [0, 1, 2, 3];

class OnErrorTestPage extends TestPage {
  OnErrorTestPage(super.title) {
    test('Rx.onErrorResumeNext', () async {
      var count = 0;

      Stream<int>.error(Exception())
          .onErrorResumeNext(_getStream())
          .listen(((result) {
        expect(result);
      }));
    });

    test('Rx.onErrorResume', () async {
      var count = 0;

      Stream<int>.error(Exception())
          .onErrorResume((e, st) => _getStream())
          .listen(((result) {
        expect(result);
      }));
    });

    test('Rx.onErrorResume.correctError', () async {
      final exception = Exception();

      expect(
        Stream<Object>.error(exception).onErrorResume((e, st) => Stream.value(e)),
      );
    });

    test('Rx.onErrorResumeNext.asBroadcastStream', () async {
      final stream = Stream<int>.error(Exception())
          .onErrorResumeNext(_getStream())
          .asBroadcastStream();
      var countA = 0, countB = 0;

      expect(stream.isBroadcast);

      stream.listen(((result) {
        expect(result);
      }));
      stream.listen(((result) {
        expect(result);
      }));
    });

    test('Rx.onErrorResumeNext.error.shouldThrow', () async {
      final streamWithError = Stream<void>.error(Exception())
          .onErrorResumeNext(Stream<void>.error(Exception()));

      streamWithError.listen(null,
          onError: (Object e, StackTrace s) {
            expect(e);
          });
    });

    test('Rx.onErrorResumeNext.pause.resume', () async {
      final transformer = OnErrorResumeStreamTransformer<int>((_, __) => _getStream());
      final exp = const [50] + expected;
      late StreamSubscription<num> subscription;
      var count = 0;

      subscription = Rx.merge([
        Stream.value(50),
        Stream<int>.error(Exception()),
      ]).transform(transformer).listen(((result) {
        expect(result);

        if (count == exp.length) {
          subscription.cancel();
        }
      }));

      subscription.pause();
      subscription.resume();
    });

    test('Rx.onErrorResumeNext.close', () async {

      Stream<int>.error(Exception()).onErrorResumeNext(_getStream()).listen(
          ((result) {
            expect(result);
          }),
          onDone: (() {
            expect(true);
          }));
    });

    test('Rx.onErrorResumeNext.noErrors.close', () async {
      expect(
        Stream<int>.empty().onErrorResumeNext(_getStream()),
      );
    });

    test('OnErrorResumeStreamTransformer.reusable', () async {
      final transformer = OnErrorResumeStreamTransformer<int>(
              (_, __) => _getStream().asBroadcastStream());

      Stream<int>.error(Exception())
          .transform(transformer)
          .listen(((result) {
        expect(result);
      }));

      Stream<int>.error(Exception())
          .transform(transformer)
          .listen(((result) {
        expect(result);
      }));
    });

    test('Rx.onErrorResume accidental broadcast', () async {
      final controller = StreamController<int>();

      final stream =
      controller.stream.onErrorResume((_, __) => Stream<int>.empty());

      stream.listen(null);
      expect(() => stream.listen(null));

      controller.add(1);
    });

    test('Rx.onErrorResumeNext accidental broadcast', () async {
      final controller = StreamController<int>();

      final stream = controller.stream.onErrorResumeNext(Stream<int>.empty());

      stream.listen(null);
      expect(() => stream.listen(null));

      controller.add(1);
    });

    test('Rx.onErrorResume still adds data when Stream emits an error: issue/616',
            () {
          {
            final stream = Rx.concat<int>([
              Stream.value(1),
              Stream.error(Exception()),
              Stream.fromIterable([2, 3]),
              Stream.error(Exception()),
              Stream.value(4),
            ]).onErrorResume((e, s) => Stream.value(-1));
            expect(
              stream,
            );
          }

          {
            final stream = Rx.concat<int>([
              Stream.value(1),
              Stream.error(Exception()),
              Stream.fromIterable([2, 3]),
              Stream.error(Exception()),
              Stream.value(4),
            ]).onErrorResumeNext(Stream.value(-1));
            expect(
              stream,
            );
          }
        });

    test('Rx.onErrorResumeNext with many errors', () {
      final stream = Rx.concat<int>([
        Stream.value(1),
        Stream.error(Exception()),
        Stream.value(2),
        Stream.error(StateError('')),
        Stream.value(3),
      ]).onErrorResume((e, s) {
        if (e is Exception) {
          return Rx.timer(-1, const Duration(milliseconds: 100));
        }
        if (e is StateError) {
          return Rx.timer(-2, const Duration(milliseconds: 200));
        }
        throw e;
      });
      expect(
        stream,
      );
    });

    test('Rx.onErrorResumeNext.nullable', () {
      nullableTest<String?>(
            (s) => s.onErrorResumeNext(Stream.empty()),
      );
    });

    test('Rx.onErrorReturn', () async {
      Stream<num>.error(Exception())
          .onErrorReturn(0)
          .listen(((num result) {
        expect(result);
      }));
    });

    test('Rx.onErrorReturn.asBroadcastStream', () async {
      final stream =
      Stream<num>.error(Exception()).onErrorReturn(0).asBroadcastStream();

      expect(stream.isBroadcast);

      stream.listen(((num result) {
        expect(result);
      }));

      stream.listen(((num result) {
        expect(result);
      }));
    });

    test('Rx.onErrorReturn.pause.resume', () async {
      late StreamSubscription<num> subscription;

      subscription = Stream<num>.error(Exception())
          .onErrorReturn(0)
          .listen(((num result) {
        expect(result);

        subscription.cancel();
      }));

      subscription.pause();
      subscription.resume();
    });

    test('Rx.onErrorReturn accidental broadcast', () async {
      final controller = StreamController<int>();

      final stream = controller.stream.onErrorReturn(1);

      stream.listen(null);
      expect(() => stream.listen(null));

      controller.add(1);
    });

    test('Rx.onErrorReturn still adds data when Stream emits an error: issue/616',
            () {
          final stream = Rx.concat<int>([
            Stream.value(1),
            Stream.error(Exception()),
            Stream.fromIterable([2, 3]),
            Stream.error(Exception()),
            Stream.value(4),
          ]).onErrorReturn(-1);
          expect(
            stream,
          );
        });

    test('Rx.onErrorReturn.nullable', () {
      nullableTest<String?>(
            (s) => s.onErrorReturn('String'),
      );
    });

    test('Rx.onErrorReturnWith', () async {
      Stream<num>.error(Exception())
          .onErrorReturnWith((e, _) => e is StateError ? 1 : 0)
          .listen(((num result) {
        expect(result);
      }));
    });

    test('Rx.onErrorReturnWith.asBroadcastStream', () async {
      final stream = Stream<num>.error(Exception())
          .onErrorReturnWith((_, __) => 0)
          .asBroadcastStream();

      expect(stream.isBroadcast);

      stream.listen(((num result) {
        expect(result);
      }));

      stream.listen(((num result) {
        expect(result);
      }));
    });

    test('Rx.onErrorReturnWith.pause.resume', () async {
      late StreamSubscription<num> subscription;

      subscription = Stream<num>.error(Exception())
          .onErrorReturnWith((_, __) => 0)
          .listen(((num result) {
        expect(result);

        subscription.cancel();
      }));

      subscription.pause();
      subscription.resume();
    });

    test('Rx.onErrorReturnWith accidental broadcast', () async {
      final controller = StreamController<int>();

      final stream = controller.stream.onErrorReturnWith((_, __) => 1);

      stream.listen(null);
      expect(() => stream.listen(null));

      controller.add(1);
    });

    test(
        'Rx.onErrorReturnWith still adds data when Stream emits an error: issue/616',
            () {
          final stream = Rx.concat<int>([
            Stream.value(1),
            Stream.error(Exception()),
            Stream.fromIterable([2, 3]),
            Stream.error(Exception()),
            Stream.value(4),
          ]).onErrorReturnWith((e, s) => -1);
          expect(
            stream,
          );
        });

    test('Rx.onErrorReturnWith.nullable', () {
      nullableTest<String?>(
            (s) => s.onErrorReturnWith((e, s) => 'String'),
      );
    });

  }

}